RxJava2.x 订阅流程解析
前言
RxJava + Retrofit + okhttp 的网络请求框架已经成为 Android 开发的绝对主流框架,其带来的简洁、方便的调用形式早已征服了每个 Android 开发者,那么我们完全有必要去深入研究其实现原理。
文章的初衷就是希望能给你带来 RxJava2.x 的订阅流程更深入的解析。
相关概念
- ReactiveX 是一个基于一系列可观察的异步和基础事件(composing asynchronous and event-based programs)编程组成的一个库,而 RxJava 则是这个库基于 Java VM 实现的,当然还有 RxJs,RxKotlin 等基于其他语言实现的库。RxJava2 则是为了遵循 Reactive-Streams specification 规范进行了完全的重写,而且无法与 RxJava 共存,但其实现原理都是相同的,所以本文将通过 RxJava2 的源码来进行深入解读。
- 观察者模式 是一种运用十分广泛的设计模式,它有个两个概念观察者及被观察者,观察对象通过注册或者订阅被观察对象建立两者的联系,那么被观察的对象发生的特定行为会及时通知观察对象进行相应。在 Android 开发中 View 的 OnClickListener 就是典型的观察者模式,被观察者目标view 的点击事件触发会触发观察者 OnClickListener 的 onClick 方法响应。
订阅流程
已 RxJava2.x 为例,下面是一个简单的 RxJava 订阅流程,我们忽略 RxJava2.x 因背压处理的新观察者模型,就以 RxJava1.x 的观察者模型 Observable ( 被观察者 ) / Observer ( 观察者 ) 为例:
1 | Observable |
这是最简单的 RxJava2.x 订阅流程,Observable.create() 方法创建了一个被观察者对象,由 ObservableOnSubscribe 对象作为事件源,而 Observer 作为观察者通过 subscribe() 方法订阅注册到被观察者,此时事件源发出的事件将会被 Observer 接受输出。
订阅流程解析
我们知道订阅是由 Observable.subscribe 方法触发
Observable.subscribe 方法
1 | public final void subscribe(Observer<? super T> observer) { |
去除一些检验包装代码后,订阅事件的核心是由 Observable.subscribeActual 方法完成的,而这个方法是一个抽象方法,具体逻辑则是由子类实现,那我们就进入该流程中的 Observable.create 方法创建出的子类 ObservableCreate 中的 subscribeActual 具体实现
1 |
|
通过子类实现的订阅方法完成了如下逻辑,当被观察者被订阅时,会触发被观察者内部封装事件源的相关方法,然后再触发观察者的监听方法
RxJava 可以通过插入操作符对事件流进行操作,那么有操作符的订阅流程是怎么样的呢
1 | ... 被观察者 |
Observable.map 方法
1 | public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { |
最简单的操作符 map 返回的则是一个 ObservableMap 对象,我们需要去查找 ObservableMap.subscribeActual 方法
1 |
|
其中 source 是在构造函数中传入的 this 对象,即上一个 Observable 对象,在 map 操作中会生成一个新的 Observable,在 subscribeActual 实现方法中会对上一个 Observable 进行订阅。同时,会对 Observer 也会进行包装,为的是真正的数据转化工作
用图的方式更直白一些
订阅流程就比较直观地展示,只有在 Observable 被订阅时才会向上订阅一直到事件源,同时会根据操作符类型对 Observer 做一些转换封装,事件源调用后,事件会通过 Observer 层层调用最终调用到最初订阅时传入的 Observer
结语
虽然只是通过一个简单的流程分析了订阅流程,但其他的模型也是类似的,只不过可能其他类型的 Observable 可能实现了更为复杂的控制流程,譬如说实现一些缓存队列来保证数据的缓存。最终通过操作符的转换形成一条调用链,当被订阅时会沿着链子向上调用,然后事件向下回调传递
接下去,我会分析 RxJava2.x 另外一个重要特性线程切换,并简单讲述一些系统提供的 Schedulers 特性和实现原理